package org.spider.api;

import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Spider 线程提交取出模型
 */
@Slf4j
public class SpiderDemo {
    /**
     * eg2:<br>
     * 0->1->2 <br>
     * |->3->4
     *
     * @param args
     */
    public static void main(String[] args) throws InterruptedException {
        SpiderDemo demo = new SpiderDemo();
//        Node root = new Node("0");
//        Node p = root;
//        for (int i = 1; i <= 5; i++) {
//            p.next = new Node(String.valueOf(i));
//            p = p.next;
//        }
//        demo.executeRoot(root);

        Node root2 = new Node("0");
        Node node1 = new Node("1");
        Node node2 = new Node("2");
        Node node3 = new Node("3");
        Node node4 = new Node("4");
        root2.nextNodeList.add(node1);
        root2.nextNodeList.add(node3);
        node1.nextNodeList.add(node2);
        node3.nextNodeList.add(node4);
        demo.executeRoot(root2);

        CrawlMonitor monitor=new CrawlMonitor();
        for (int i = 0; i < 10; i++) {
            monitor.submitCrawlJob(new Object());
        }
        Runnable r = ()->monitor.run();
        new Thread(r).start();
        Thread.sleep(2000);
        System.out.println("开始暂停");
        monitor.stop();
    }

    /*
     23:51:11.700 [pool-1-thread-2] INFO org.spider.api.SpiderDemo - 执行--0
     23:51:11.714 [pool-1-thread-1] INFO org.spider.api.SpiderDemo - node--0 完成了
     23:51:11.715 [pool-1-thread-2] INFO org.spider.api.SpiderDemo - 执行--1
     23:51:11.715 [pool-1-thread-3] INFO org.spider.api.SpiderDemo - 执行--3
     23:51:11.738 [pool-1-thread-1] INFO org.spider.api.SpiderDemo - node--1 完成了
     23:51:11.738 [pool-1-thread-3] INFO org.spider.api.SpiderDemo - 执行--2
     23:51:11.754 [pool-1-thread-1] INFO org.spider.api.SpiderDemo - node--3 完成了
     23:51:11.754 [pool-1-thread-3] INFO org.spider.api.SpiderDemo - 执行--4
     23:51:11.770 [pool-1-thread-1] INFO org.spider.api.SpiderDemo - node--2 完成了
     23:51:11.786 [pool-1-thread-1] INFO org.spider.api.SpiderDemo - node--4 完成了
     23:51:11.802 [main] INFO org.spider.api.SpiderDemo - 所有任务完成了
     */

    int thread = 32;
    ExecutorService executorService;
    Queue<Future<?>> futureQueue;

    public void executeRoot(Node root) {
        executorService = Executors.newFixedThreadPool(4);
        futureQueue = new LinkedBlockingQueue<>();
        Runnable r = () -> {
            executeNode(root);
            while (!futureQueue.isEmpty()) {
                Optional<Future<?>> first = futureQueue.stream().filter(Future::isDone).findFirst();
                if (first.isPresent()) {
                    Future<?> future = first.get();
                    futureQueue.remove(future);
                    try {
                        Node completedNode = (Node) future.get();
                        log.info("node--" + completedNode.id + " 完成了");
//                        executeNext(completedNode.next);
                        executeNextList(completedNode);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(10);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        };
        Future<?> future = executorService.submit(r);
        //这个不能add,因为最后会有这个线程，那么futureQueue中会存在这个Future，while无法终止，线程无法结束，死循环了.
//        futureQueue.add(future);
        try {
            future.get();
            log.info("所有任务完成了");
            executorService.shutdownNow();
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    public void executeNode(Node node) {
        Runnable r = () -> {
//            node.flag.set(true);
            log.info("执行--" + node.id);
        };
        //没执行过
        if (!node.flag.get()) {
            Future<?> future = executorService.submit(r, node);
            futureQueue.add(future);
        }
    }

    public void executeNext(Node node) {
        while (node != null) {
            executeNode(node);
            node = node.next;
        }
    }

    public void executeNextList(Node node) {
        for (Node e : node.nextNodeList) {
            executeNode(e);
        }
    }

    static class Node {
        String id;
        Node next;

        List<Node> nextNodeList = new ArrayList<>();

        AtomicBoolean flag = new AtomicBoolean(false);

        void run() {
            System.out.println(id);
        }

        public Node(String id, Node next) {
            this.id = id;
            this.next = next;
        }

        public Node(String id) {
            this(id, null);
        }
    }


    /**
     * LinkedHashMap没有队列的功能，而等待集合应该以先进先出的方式执行<br>
     * 因此手动写一个双向链表+map
     * @param <V>
     */
    class SimpleLinkedHashMap<V> {
        private DNode dummyHead,dummyTail;
        private Map<String,DNode> map;

        public SimpleLinkedHashMap() {
            dummyHead = new DNode();
            dummyTail = new DNode();
            dummyHead.next = dummyTail;
            dummyTail.prev = dummyHead;
            map = new HashMap<>();
        }

        public void put(String key,Object value){
            DNode node = new DNode(key,value);

        }

        private void appendToTail(DNode node){

        }

        private DNode getFirst(){
            DNode first = null;
            if(isEmpty()) return null;

            return first;
        }

        public boolean isEmpty(){
            return dummyHead.next == dummyTail;
        }
    }

    class DNode{
        String key;
        Object value;
        DNode next,prev;

        public DNode(String key,Object value, DNode next, DNode prev) {
            this.key = key;
            this.value=value;
            this.next = next;
            this.prev = prev;
        }

        public DNode(String key,Object value){
            this(key,value,null,null);
        }

        public DNode(){}
    }
}

/**
 * 页面请求的串行执行器<br>
 *  (key,Job)进来 (key,result)回去
 */
class CrawlMonitor{
    private static final Queue<Object> waitQueue;
    private static final ReentrantLock lock;
    private static final Condition condition;
    private static final Map<String,Object> succeedSet ;
    static boolean isRunning = true;

    static {
        waitQueue = new LinkedList<>();
        lock=new ReentrantLock();
        condition = lock.newCondition();
        succeedSet = new HashMap<>();
    }

    /**
     * 是否应该返回个key，让等待的线程可以知道如何获取
     * @param job
     */
    public void submitCrawlJob(Object job){
        lock.lock();
        try {
            waitQueue.offer(job);
        }
        finally {
            lock.unlock();
        }
    }
    // 没ok
    public void run(){
        while ( isRunning ){
            lock.lock();
            try {
                if( !waitQueue.isEmpty() ){
                    Object job = waitQueue.poll();
                    //执行
                    Object result=null;
                    //执行完成后,结果存入succeedSet,然后通知
//                        succeedSet.put();
//                    condition.signalAll();
                    System.out.println("完成了一个任务:"+job);
                }else {
                    try {
                        Thread.sleep(1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }finally {
                lock.unlock();
            }
        }
        System.out.println("停止了...");
    }

    public void stop(){
        isRunning = false;
    }

    // ok了
    public static Object get(String key){
        Object result = null;
        lock.lock();
        try {
            while (succeedSet.get(key) ==null){
                condition.await();
            }
            result = succeedSet.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return result;
    }
}
